跳到主要内容

Go 实现一个优雅的异步服务包装

有时我们需要一个通用的包装器用于统一的管理服务

// service.go
package util

import (
"context"
"log"
)

// StartServiceAsync is used to start service async
func StartServiceAsync(ctx context.Context, cancelFunc context.CancelFunc, logger log.Logger, serveFn func() error, stopFn func() error) {
if serveFn == nil {
return
}
go func() {
logger.Println("starting service")
go func() {
if err := serveFn(); err != nil {
logger.Printf("error serving service: %s \n", err)
}
if cancelFunc != nil {
cancelFunc()
}
}()

<-ctx.Done()
logger.Println("stopping service")

if stopFn() != nil {
logger.Println("stopping service gracefully")
if err := stopFn(); err != nil {
logger.Printf("error occurred while stopping service: %s \n", err)
}
}
logger.Println("exiting service")
}()
}

使用的时候也很方便

包装 HTTP 服务

// Start is used to start the service
func (s *MockServer) Start(ctx context.Context, cancelFunc context.CancelFunc) error {
s.LogInfo(nil, "starting http mock server on: %s", s.cfg.Address)
listener, err := net.Listen("tcp", s.cfg.Address)
if err != nil {
return err
}

util.StartServiceAsync(ctx, cancelFunc, s.Logger, func() error {
return http.Serve(listener, s)
}, func() error {
return listener.Close()
})
return nil
}

包装 gRPC 服务

// Start is used to start the service
func (s *MockServer) Start(ctx context.Context, cancelFunc context.CancelFunc) error {
s.LogInfo(nil, "stating proto manager")
if err := s.protoManager.Start(ctx, cancelFunc); err != nil {
return err
}

s.LogInfo(nil, "starting gRPC mock server on: %s", s.cfg.Address)
server := grpc.NewServer(grpc.UnknownServiceHandler(s.handleStream))
listener, err := net.Listen("tcp", s.cfg.Address)
if err != nil {
return err
}

util.StartServiceAsync(ctx, cancelFunc, s.Logger.NewLogger("gRPC"), func() error {
return server.Serve(listener)
}, func() error {
server.GracefulStop()
return nil
})
return nil
}

包装 Prometheus 服务

// StartMetricsServer is used to start metric server
func StartMetricsServer(ctx context.Context, cancelFunc context.CancelFunc, logger logger.Logger, address string, registerer prometheus.Registerer) error {
listener, err := net.Listen("tcp", address)
if err != nil {
return err
}
StartServiceAsync(ctx, cancelFunc, logger, func() error {
return http.Serve(listener, promhttp.InstrumentMetricHandler(
registerer, promhttp.HandlerFor(prometheus.DefaultGatherer, promhttp.HandlerOpts{}),
))
}, func() error {
return listener.Close()
})
return nil
}